{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "df953e1a-b48a-4eda-b099-c89c9449fdad",
   "metadata": {},
   "source": [
    "## Spark write Hive table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "429e5d1b",
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "import os\n",
    "from pyspark.sql import SparkSession\n",
    "\n",
    "spark_home = os.getenv('SPARK_HOME')\n",
    "gravitino_connector_jar = os.getenv('SPARK_CONNECTOR_JAR')\n",
    "os.environ['HADOOP_USER_NAME']=\"anonymous\"\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "    .appName(\"PySpark SQL Example\") \\\n",
    "    .config(\"spark.plugins\", \"org.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin\") \\\n",
    "    .config(\"spark.jars\", f\"/tmp/gravitino/packages/iceberg-spark-runtime-3.4_2.12-1.5.2.jar,/tmp/gravitino/packages/{gravitino_connector_jar}\") \\\n",
    "    .config(\"spark.sql.gravitino.uri\", \"http://gravitino:8090\") \\\n",
    "    .config(\"spark.sql.gravitino.metalake\", \"metalake_demo\") \\\n",
    "    .config(\"spark.sql.gravitino.enableIcebergSupport\", \"true\") \\\n",
    "    .config(\"spark.sql.catalog.catalog_rest\", \"org.apache.iceberg.spark.SparkCatalog\") \\\n",
    "    .config(\"spark.sql.catalog.catalog_rest.type\", \"rest\") \\\n",
    "    .config(\"spark.sql.catalog.catalog_rest.uri\", \"http://gravitino:9001/iceberg/\") \\\n",
    "    .config(\"spark.locality.wait.node\", \"0\") \\\n",
    "    .config(\"spark.sql.warehouse.dir\", \"hdfs://hive:9000/user/hive/warehouse\") \\\n",
    "    .enableHiveSupport() \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d70e6c04-bb61-4b5b-8525-41a4a5a34b54",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(\"use catalog_hive\")\n",
    "spark.sql(\"show databases\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4f3e6e17-31d4-40dd-953b-358c87fbf429",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(\"CREATE DATABASE IF NOT EXISTS product;\")\n",
    "spark.sql(\"USE product;\")\n",
    "spark.sql(\"CREATE TABLE IF NOT EXISTS employees (id INT, name STRING, age INT) PARTITIONED BY (department STRING) STORED AS PARQUET;\")\n",
    "spark.sql(\"DESC TABLE EXTENDED employees;\").show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "69ebc70e-1ad0-4ea3-88f1-8e7038160d9b",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(\"INSERT OVERWRITE TABLE employees PARTITION(department='Engineering') VALUES (1, 'John Doe', 30), (2, 'Jane Smith', 28);\")\n",
    "spark.sql(\"INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'Mike Brown', 32);\")\n",
    "spark.sql(\"SELECT * from employees\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "554c5267-be5c-4dd8-9106-f4379597c16a",
   "metadata": {},
   "source": [
    "## Query the table with Trino"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3b0d643f-1593-4f82-94a7-e6bb40a12be2",
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install trino"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "33cc8e5a-1e76-42c7-b65b-5672d25162be",
   "metadata": {},
   "outputs": [],
   "source": [
    "from trino.dbapi import connect\n",
    "\n",
    "# Create a Trino connector client\n",
    "conn = connect(\n",
    "    host=\"trino\",\n",
    "    port=8080,\n",
    "    user=\"admin\",\n",
    "    catalog=\"catalog_hive\",\n",
    "    schema=\"http\",\n",
    ")\n",
    "\n",
    "trino_client = conn.cursor()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3479ab03-00d5-4115-a43c-cf0a39411183",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(trino_client.execute(\"SELECT * FROM catalog_hive.product.employees WHERE department = 'Engineering'\").fetchall())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b33ef14d-a3b7-48a0-a3e7-5c0daf6f7ee5",
   "metadata": {},
   "source": [
    "## Spark write data with Iceberg REST service"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c05c7234-28e9-45e4-867c-2304febef730",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(\"use catalog_rest;\")\n",
    "spark.sql(\"create database if not exists sales;\")\n",
    "spark.sql(\"use sales;\")\n",
    "spark.sql(\"create table customers (customer_id int, customer_name varchar(100), customer_email varchar(100));\")\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "836de98b-5107-4f6c-a0fc-73a1fcf23d6f",
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(\"insert into customers (customer_id, customer_name, customer_email) values (11,'Rory Brown','rory@123.com');\")\n",
    "spark.sql(\"insert into customers (customer_id, customer_name, customer_email) values (12,'Jerry Washington','jerry@dt.com');\")\n",
    "spark.sql(\"select * from customers\").show()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "74b8e45c-e962-4479-947f-478a9e7cc0c6",
   "metadata": {},
   "source": [
    "## Trino do federation query data with Hive and Iceberg"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "76057b91-c720-485f-ada7-2ae773fea311",
   "metadata": {},
   "outputs": [],
   "source": [
    "print(trino_client.execute(\"select * from catalog_hive.sales.customers union select * from catalog_iceberg.sales.customers\").fetchall())"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
